From c42be1dcbedd103f27bd8a81e71cfd01ee3d5634 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 23 Sep 2013 13:12:08 +0800 Subject: [PATCH 1/5] Implemented 'cast' operator --- rxjava-core/src/main/java/rx/Observable.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index c21913f86a..19cea34b99 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -4332,6 +4332,23 @@ public BlockingObservable toBlockingObservable() { return BlockingObservable.from(this); } + /** + * Converts the elements of an observable sequence to the specified type. + * + * @return An observable sequence that contains each element of the source + * sequence converted to the specified type. + * + * @see MSDN: Observable.Cast + */ + public Observable cast() { + return map(new Func1() { + @SuppressWarnings("unchecked") + public R call(T t) { + return (R) t; + } + }); + } + /** * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. *

From 6788900bd336b5fc16230fc4f20d0ae1b52a82aa Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 23 Sep 2013 14:20:27 +0800 Subject: [PATCH 2/5] Added a klass parameter --- rxjava-core/src/main/java/rx/Observable.java | 17 ++--- .../main/java/rx/operators/OperationCast.java | 67 +++++++++++++++++++ 2 files changed, 76 insertions(+), 8 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationCast.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 19cea34b99..edb2222a38 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -33,6 +33,7 @@ import rx.operators.OperationAverage; import rx.operators.OperationBuffer; import rx.operators.OperationCache; +import rx.operators.OperationCast; import rx.operators.OperationCombineLatest; import rx.operators.OperationConcat; import rx.operators.OperationDebounce; @@ -4335,18 +4336,18 @@ public BlockingObservable toBlockingObservable() { /** * Converts the elements of an observable sequence to the specified type. * + * @param klass + * The target class type which the elements will be converted to. + * * @return An observable sequence that contains each element of the source * sequence converted to the specified type. * - * @see MSDN: Observable.Cast + * @see MSDN: + * Observable.Cast */ - public Observable cast() { - return map(new Func1() { - @SuppressWarnings("unchecked") - public R call(T t) { - return (R) t; - } - }); + public Observable cast(final Class klass) { + return create(OperationCast.cast(this, klass)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationCast.java b/rxjava-core/src/main/java/rx/operators/OperationCast.java new file mode 100644 index 0000000000..e0a20a57ea --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationCast.java @@ -0,0 +1,67 @@ +package rx.operators; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.util.functions.Func1; + +/** + * Converts the elements of an observable sequence to the specified type. + */ +public class OperationCast { + + public static OnSubscribeFunc cast( + Observable source, final Class klass) { + return OperationMap.map(source, new Func1() { + @SuppressWarnings("unchecked") + public R call(T t) { + if (klass.isAssignableFrom(t.getClass())) { + return (R) t; + } else { + throw new ClassCastException(t.getClass() + + " cannot be cast to " + klass); + } + } + }); + } + + public static class UnitTest { + + @Test + public void testCast() { + Observable source = Observable.from(1, 2); + Observable observable = Observable.create(cast(source, + Integer.class)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onNext(1); + verify(aObserver, times(1)).onNext(1); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testCastWithWrongType() { + Observable source = Observable.from(1, 2); + Observable observable = Observable.create(cast(source, + Boolean.class)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onError( + org.mockito.Matchers.any(ClassCastException.class)); + } + } + +} From cde74498f3bb397ce1301ff2031f4678b58d8d83 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 23 Sep 2013 14:39:06 +0800 Subject: [PATCH 3/5] Implemented the 'ofType' operator --- rxjava-core/src/main/java/rx/Observable.java | 21 +++++++++++++++++++ .../src/test/java/rx/ObservableTests.java | 17 +++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index edb2222a38..9d2ded57f2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -4350,6 +4350,27 @@ public Observable cast(final Class klass) { return create(OperationCast.cast(this, klass)); } + /** + * Filters the elements of an observable sequence based on the specified + * type. + * + * @param klass + * The class type to filter the elements in the source sequence + * on. + * + * @return An observable sequence that contains elements from the input + * sequence of type klass. + * + * @see MSDN: Observable.OfType + */ + public Observable ofType(final Class klass) { + return filter(new Func1() { + public Boolean call(T t) { + return klass.isAssignableFrom(t.getClass()); + } + }).cast(klass); + } + /** * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. *

diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index 92af02b1dd..e278276b39 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -701,4 +701,21 @@ public void onNext(String v) { fail("It should be a NumberFormatException"); } } + + @Test + public void testOfType() { + Observable observable = Observable.from(1, "abc", false, 2L).ofType(String.class); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(1); + verify(aObserver, times(1)).onNext("abc"); + verify(aObserver, never()).onNext(false); + verify(aObserver, never()).onNext(2L); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + } \ No newline at end of file From e0caeeea9e5d2c7e6c25342503a1e80d3217076d Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 23 Sep 2013 18:56:20 +0800 Subject: [PATCH 4/5] Used 'cast' to remove SuppressWarnings --- rxjava-core/src/main/java/rx/operators/OperationCast.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationCast.java b/rxjava-core/src/main/java/rx/operators/OperationCast.java index e0a20a57ea..a4887c3afe 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCast.java @@ -20,14 +20,8 @@ public class OperationCast { public static OnSubscribeFunc cast( Observable source, final Class klass) { return OperationMap.map(source, new Func1() { - @SuppressWarnings("unchecked") public R call(T t) { - if (klass.isAssignableFrom(t.getClass())) { - return (R) t; - } else { - throw new ClassCastException(t.getClass() - + " cannot be cast to " + klass); - } + return klass.cast(t); } }); } From 02fd9524a93d4fbb22353e9a3d15b6ef7653434b Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 23 Sep 2013 19:09:18 +0800 Subject: [PATCH 5/5] Used 'isInstance' to replace 'isAssignableFrom' and added a unit test --- rxjava-core/src/main/java/rx/Observable.java | 2 +- .../src/test/java/rx/ObservableTests.java | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 9d2ded57f2..35f3d0fe21 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -4366,7 +4366,7 @@ public Observable cast(final Class klass) { public Observable ofType(final Class klass) { return filter(new Func1() { public Boolean call(T t) { - return klass.isAssignableFrom(t.getClass()); + return klass.isInstance(t); } }).cast(klass); } diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index e278276b39..297bb1ff47 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -20,6 +20,8 @@ import static org.mockito.Mockito.*; import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -718,4 +720,25 @@ public void testOfType() { verify(aObserver, times(1)).onCompleted(); } + @Test + public void testOfTypeWithPolymorphism() { + ArrayList l1 = new ArrayList(); + l1.add(1); + LinkedList l2 = new LinkedList(); + l2.add(2); + + @SuppressWarnings("rawtypes") + Observable observable = Observable.from(l1, l2, "123").ofType(List.class); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onNext(l1); + verify(aObserver, times(1)).onNext(l2); + verify(aObserver, never()).onNext("123"); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + } \ No newline at end of file