diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index ce47c7085c..6bcd3e9f77 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -49,6 +49,7 @@ import rx.operators.OperationFirstOrDefault; import rx.operators.OperationGroupBy; import rx.operators.OperationInterval; +import rx.operators.OperationLast; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; import rx.operators.OperationMerge; @@ -4461,12 +4462,21 @@ public Observable> groupBy(final Func1 * In Rx.Net this is negated as the any operator but renamed in RxJava to better match Java naming idioms. * - * @return A subscription function for creating the target Observable. + * @return An Observable that emits Boolean. * @see MSDN: Observable.Any */ public Observable isEmpty() { return create(OperationAny.isEmpty(this)); } + + /** + * Returns an {@link Observable} that emits the last element of the source or an IllegalArgumentException if the source {@link Observable} is empty. + * + * @return Observable + */ + public Observable last() { + return create(OperationLast.last(this)); + } /** * Converts an Observable into a {@link BlockingObservable} (an Observable with blocking diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java index 36f372685e..8fe13c9e88 100644 --- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -178,13 +178,10 @@ public Iterator getIterator() { * * * @return the last item emitted by the source {@link Observable} + * @throws IllegalArgumentException if source contains no elements */ public T last() { - T result = null; - for (T value : toIterable()) { - result = value; - } - return result; + return new BlockingObservable(o.last()).single(); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationLast.java b/rxjava-core/src/main/java/rx/operators/OperationLast.java new file mode 100644 index 0000000000..964afd5176 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationLast.java @@ -0,0 +1,82 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; + +/** + * Emit an Observable with the last emitted item + * or onError(new IllegalArgumentException("Sequence contains no elements")) if no elements received. + */ +public class OperationLast { + + /** + * Accepts a sequence and returns a sequence that is the last emitted item + * or an error if no items are emitted (empty sequence). + * + * @param sequence + * the input sequence. + * @param + * the type of the sequence. + * @return a sequence containing the last emitted item or that has onError invoked on it if no items + */ + public static OnSubscribeFunc last(final Observable sequence) { + return new OnSubscribeFunc() { + final AtomicReference last = new AtomicReference(); + final AtomicBoolean hasLast = new AtomicBoolean(false); + + @Override + public Subscription onSubscribe(final Observer observer) { + return sequence.subscribe(new Observer() { + + @Override + public void onCompleted() { + /* + * We don't need to worry about the following being non-atomic + * since an Observable sequence is serial so we will not receive + * concurrent executions. + */ + if (hasLast.get()) { + observer.onNext(last.get()); + observer.onCompleted(); + } else { + observer.onError(new IllegalArgumentException("Sequence contains no elements")); + } + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(T value) { + last.set(value); + hasLast.set(true); + } + }); + } + + }; + } + +} diff --git a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java index b546fd64bb..f3e05189c1 100644 --- a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java +++ b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java @@ -49,11 +49,10 @@ public void testLast() { assertEquals("three", obs.last()); } - @Test + @Test(expected = IllegalArgumentException.class) public void testLastEmptyObservable() { BlockingObservable obs = BlockingObservable.from(Observable.empty()); - - assertNull(obs.last()); + obs.last(); } @Test diff --git a/rxjava-core/src/test/java/rx/operators/OperationLastTest.java b/rxjava-core/src/test/java/rx/operators/OperationLastTest.java new file mode 100644 index 0000000000..006de63060 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationLastTest.java @@ -0,0 +1,49 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import rx.Observable; + +public class OperationLastTest { + + @Test + public void testLastWithElements() { + Observable last = Observable.create(OperationLast.last(Observable.from(1, 2, 3))); + assertEquals(3, last.toBlockingObservable().single().intValue()); + } + + @Test(expected = IllegalArgumentException.class) + public void testLastWithNoElements() { + Observable last = Observable.create(OperationLast.last(Observable.empty())); + last.toBlockingObservable().single(); + } + + @Test + public void testLastMultiSubscribe() { + Observable last = Observable.create(OperationLast.last(Observable.from(1, 2, 3))); + assertEquals(3, last.toBlockingObservable().single().intValue()); + assertEquals(3, last.toBlockingObservable().single().intValue()); + } + + @Test + public void testLastViaObservable() { + Observable.from(1, 2, 3).last(); + } +}