diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 6907266b33..10dda41461 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -30,6 +30,7 @@ import rx.Notification; import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.functions.Func1; def class ObservableTests { @@ -61,8 +62,12 @@ def class ObservableTests { @Test public void testLast() { - new TestFactory().getObservable().last().subscribe({ result -> a.received(result)}); - verify(a, times(1)).received("hello_1"); + assertEquals("three", Observable.toObservable("one", "two", "three").last()) + } + + @Test + public void testLastWithPredicate() { + assertEquals("two", Observable.toObservable("one", "two", "three").last({ x -> x.length() == 3})) } @Test @@ -175,6 +180,12 @@ def class ObservableTests { verify(a, times(0)).received(3); } + @Test + public void testTakeLast() { + new TestFactory().getObservable().takeLast(1).subscribe({ result -> a.received(result)}); + verify(a, times(1)).received("hello_1"); + } + @Test public void testTakeWhileViaGroovy() { Observable.takeWhile(Observable.toObservable(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)}); @@ -280,7 +291,7 @@ def class ObservableTests { observer.onCompleted(); } }).start(); - return Observable.noOpSubscription(); + return Subscriptions.empty(); } } diff --git a/language-adaptors/rxjava-jruby/src/main/java/rx/lang/jruby/JRubyAdaptor.java b/language-adaptors/rxjava-jruby/src/main/java/rx/lang/jruby/JRubyAdaptor.java index b5b3bc5bc2..c5614b12b9 100644 --- a/language-adaptors/rxjava-jruby/src/main/java/rx/lang/jruby/JRubyAdaptor.java +++ b/language-adaptors/rxjava-jruby/src/main/java/rx/lang/jruby/JRubyAdaptor.java @@ -84,7 +84,7 @@ public void testFilterViaGroovy() { @Test public void testLast() { - String script = "mockApiCall.getObservable().last().subscribe(lambda{|result| a.received(result)})"; + String script = "mockApiCall.getObservable().takeLast(1).subscribe(lambda{|result| a.received(result)})"; runGroovyScript(script); verify(assertion, times(1)).received("hello_1"); } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala index 5555b51290..584c7c587f 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala @@ -160,10 +160,10 @@ class UnitTestSuite extends JUnitSuite { verify(assertion, times(1)).received(List(2,4,6,8)) } - @Test def testLast() { + @Test def testTakeLast() { val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9) - numbers.last().subscribe((callback: Int) => { - println("testLast: onNext -> got " + callback) + numbers.takeLast(1).subscribe((callback: Int) => { + println("testTakeLast: onNext -> got " + callback) assertion.received(callback) }) verify(assertion, times(1)).received(9) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d7a7616bb6..2a48c7eb8e 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -41,7 +41,6 @@ import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; -import rx.operators.OperationLast; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; import rx.operators.OperationMerge; @@ -537,7 +536,6 @@ public Subscription call(Observer t1) { } } - /** * an Observable that calls {@link Observer#onError(Exception)} when the Observer subscribes. * @@ -807,18 +805,44 @@ public static Observable just(T value) { } /** - * Takes the last item emitted by a source Observable and returns an Observable that emits only - * that item as its sole emission. - *

- * + * Returns the last element of an observable sequence with a specified source. + * + * @param that + * the source Observable + * @return the last element in the observable sequence. + */ + public static T last(final Observable that) { + T result = null; + for (T value : that.toIterable()) { + result = value; + } + return result; + } + + /** + * Returns the last element of an observable sequence that matches the predicate. + * + * @param that + * the source Observable + * @param predicate + * a predicate function to evaluate for elements in the sequence. + * @return the last element in the observable sequence. + */ + public static T last(final Observable that, final Func1 predicate) { + return last(that.filter(predicate)); + } + + /** + * Returns the last element of an observable sequence that matches the predicate. * * @param that * the source Observable - * @return an Observable that emits a single item, which is identical to the last item emitted - * by the source Observable + * @param predicate + * a predicate function to evaluate for elements in the sequence. + * @return the last element in the observable sequence. */ - public static Observable last(final Observable that) { - return _create(OperationLast.last(that)); + public static T last(final Observable that, final Object predicate) { + return last(that.filter(predicate)); } /** @@ -1363,7 +1387,7 @@ public static Observable onErrorReturn(final Observable that, Func1Wikipedia: Fold (higher-order function) */ public static Observable reduce(Observable sequence, Func2 accumulator) { - return last(_create(OperationScan.scan(sequence, accumulator))); + return takeLast(_create(OperationScan.scan(sequence, accumulator)), 1); } /** @@ -1435,7 +1459,7 @@ public T call(T t1, T t2) { * @see Wikipedia: Fold (higher-order function) */ public static Observable reduce(Observable sequence, T initialValue, Func2 accumulator) { - return last(_create(OperationScan.scan(sequence, initialValue, accumulator))); + return takeLast(_create(OperationScan.scan(sequence, initialValue, accumulator)), 1); } /** @@ -2364,17 +2388,44 @@ public Boolean call(T t1) { } /** - * Converts an Observable that emits a sequence of objects into one that only emits the last - * object in this sequence before completing. - *

- * + * Returns the last element of an observable sequence with a specified source. * - * @return an Observable that emits only the last item emitted by the original Observable + * @return the last element in the observable sequence. */ - public Observable last() { + public T last() { return last(this); } + /** + * Returns the last element of an observable sequence that matches the predicate. + * + * @param predicate + * a predicate function to evaluate for elements in the sequence. + * @return the last element in the observable sequence. + */ + public T last(final Func1 predicate) { + return last(this, predicate); + } + + /** + * Returns the last element of an observable sequence that matches the predicate. + * + * @param predicate + * a predicate function to evaluate for elements in the sequence. + * @return the last element in the observable sequence. + */ + public T last(final Object predicate) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(predicate); + + return last(this, new Func1() { + @Override + public Boolean call(T args) { + return (Boolean) _f.call(args); + } + }); + } + /** * Returns the last element, or a default value if no value is found. * @@ -3333,6 +3384,32 @@ public Boolean call(String args) { }); } + @Test + public void testLast() { + Observable obs = Observable.toObservable("one", "two", "three"); + + assertEquals("three", obs.last()); + } + + @Test + public void testLastWithPredicate() { + Observable obs = Observable.toObservable("one", "two", "three"); + + assertEquals("two", obs.last(new Func1() { + @Override + public Boolean call(String s) { + return s.length() == 3; + } + })); + } + + @Test + public void testLastEmptyObservable() { + Observable obs = Observable.toObservable(); + + assertNull(obs.last()); + } + private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; } diff --git a/rxjava-core/src/main/java/rx/operators/OperationLast.java b/rxjava-core/src/main/java/rx/operators/OperationLast.java deleted file mode 100644 index 6b83a74263..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationLast.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import org.junit.Test; -import org.mockito.Mockito; - -import rx.Observable; -import rx.Observer; -import rx.Subscription; -import rx.util.functions.Func1; - -/** - * Returns the last element of an observable sequence. - * - * @param - */ -public final class OperationLast { - - public static Func1, Subscription> last(Observable observable) { - return new Last(observable); - } - - private static class Last implements Func1, Subscription> { - - private final AtomicReference lastValue = new AtomicReference(); - private final Observable that; - private final AtomicBoolean onNextCalled = new AtomicBoolean(false); - - public Last(Observable that) { - this.that = that; - } - - public Subscription call(final Observer observer) { - return that.subscribe(new Observer() { - public void onNext(T value) { - onNextCalled.set(true); - lastValue.set(value); - } - - public void onError(Exception ex) { - observer.onError(ex); - } - - public void onCompleted() { - if (onNextCalled.get()) { - observer.onNext(lastValue.get()); - } - observer.onCompleted(); - } - }); - } - } - - public static class UnitTest { - - @Test - public void testLast() { - Observable w = Observable.toObservable("one", "two", "three"); - Observable observable = Observable.create(last(w)); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - observable.subscribe(aObserver); - verify(aObserver, Mockito.never()).onNext("one"); - verify(aObserver, Mockito.never()).onNext("two"); - verify(aObserver, times(1)).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); - } - } -} \ No newline at end of file