From 71e1f9eaff3a1d8a5f4f0a81e936cf6f0d1eb089 Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Fri, 13 Sep 2013 11:00:35 +0200 Subject: [PATCH 1/6] initial introduction of an equality comparator into distinct implementation, still needs test --- .../OperationDistinctUntilChanged.java | 38 ++++++++++++------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java b/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java index 25c09a5f90d..a7be2c9c669 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java @@ -22,6 +22,8 @@ import static rx.Observable.empty; import static rx.Observable.from; +import java.util.Comparator; + import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; @@ -41,6 +43,10 @@ */ public final class OperationDistinctUntilChanged { + public static OnSubscribeFunc distinctUntilChanged(Observable source, Comparator equalityComparator) { + return new DistinctUntilChanged(source, Functions.identity(), equalityComparator); + } + /** * Returns an Observable that emits all sequentially distinct items emitted by the source. * @param source @@ -48,7 +54,7 @@ public final class OperationDistinctUntilChanged { * @return A subscription function for creating the target Observable. */ public static OnSubscribeFunc distinctUntilChanged(Observable source, Func1 keySelector) { - return new DistinctUntilChanged(source, keySelector); + return new DistinctUntilChanged(source, keySelector, new DefaultEqualityComparator()); } /** @@ -58,16 +64,30 @@ public static OnSubscribeFunc distinctUntilChanged(Observable OnSubscribeFunc distinctUntilChanged(Observable source) { - return new DistinctUntilChanged(source, Functions.identity()); + return new DistinctUntilChanged(source, Functions.identity(), new DefaultEqualityComparator()); + } + + // does not define a useful ordering; it's only used for equality tests here + private static class DefaultEqualityComparator implements Comparator { + @Override + public int compare(T t1, T t2) { + if (t1 == null) { + return t2 == null ? 0 : 1; + } else { + return t1.equals(t2) ? 0 : 1; + } + } } private static class DistinctUntilChanged implements OnSubscribeFunc { private final Observable source; private final Func1 keySelector; + private final Comparator equalityComparator; - private DistinctUntilChanged(Observable source, Func1 keySelector) { + private DistinctUntilChanged(Observable source, Func1 keySelector, Comparator equalityComparator) { this.source = source; this.keySelector = keySelector; + this.equalityComparator = equalityComparator; } @Override @@ -95,16 +115,8 @@ public void onNext(T next) { if (!hasEmitted) { hasEmitted = true; observer.onNext(next); - } else { - if (lastKey == null) { - if (nextKey != null) { - observer.onNext(next); - } - } else { - if (!lastKey.equals(nextKey)) { - observer.onNext(next); - } - } + } else if (equalityComparator.compare(lastKey, nextKey) != 0) { + observer.onNext(next); } } catch (Throwable t) { // keySelector is a user function, may throw something From dbf7084208da42afbca1dff6b65cdbb4d0ca2af7 Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Fri, 13 Sep 2013 11:18:12 +0200 Subject: [PATCH 2/6] added test against comparator usage --- .../OperationDistinctUntilChanged.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java b/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java index a7be2c9c669..687b7503553 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java @@ -146,6 +146,13 @@ public String call(String s) { } }; + final Comparator COMPARE_LENGTH = new Comparator() { + @Override + public int compare(String s1, String s2) { + return s1.length() - s2.length(); + } + }; + @Before public void before() { initMocks(this); @@ -233,5 +240,19 @@ public void testDistinctUntilChangedOfSourceWithExceptionsFromKeySelector() { inOrder.verify(w, never()).onNext(anyString()); inOrder.verify(w, never()).onCompleted(); } + + @Test + public void testDistinctUntilChangedWithComparator() { + Observable src = from("a", "b", "c", "aa", "bb", "c", "ddd"); + create(distinctUntilChanged(src, COMPARE_LENGTH)).subscribe(w); + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("aa"); + inOrder.verify(w, times(1)).onNext("c"); + inOrder.verify(w, times(1)).onNext("ddd"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + } } } From b7e80bd8cfcafa3dbaad30da7c1bc50b618dfa60 Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Fri, 13 Sep 2013 11:53:17 +0200 Subject: [PATCH 3/6] added both new variants to observable --- rxjava-core/src/main/java/rx/Observable.java | 30 ++++++++++++++ .../OperationDistinctUntilChanged.java | 41 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index c0ecf6226ff..0d8168a960e 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -2948,6 +2949,35 @@ public Observable distinctUntilChanged(Func1 keyS return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector)); } + /** + * Returns an Observable that forwards all items emitted from the source Observable that are sequentially distinct according to + * a comparator. + * + * @param equalityComparator + * a comparator for deciding whether two emitted items are equal or not + * @return an Observable of sequentially distinct items + * @see MSDN: Observable.distinctUntilChanged + */ + public Observable distinctUntilChanged(Comparator equalityComparator) { + return create(OperationDistinctUntilChanged.distinctUntilChanged(this, equalityComparator)); + } + + /** + * Returns an Observable that forwards all items emitted from the source Observable that are sequentially distinct according to + * a key selector function and a comparator. + * + * @param keySelector + * a function that projects an emitted item to a key value which is used for deciding whether an item is sequentially + * distinct from another one or not + * @param equalityComparator + * a comparator for deciding whether two emitted item keys are equal or not + * @return an Observable of sequentially distinct items + * @see MSDN: Observable.distinctUntilChanged + */ + public Observable distinctUntilChanged(Func1 keySelector, Comparator equalityComparator) { + return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector, equalityComparator)); + } + /** * Returns an Observable that forwards all distinct items emitted from the source Observable. * diff --git a/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java b/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java index 687b7503553..cd18ee7123b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java @@ -43,6 +43,14 @@ */ public final class OperationDistinctUntilChanged { + /** + * Returns an Observable that emits all sequentially distinct items emitted by the source. + * @param source + * The source Observable to emit the sequentially distinct items for. + * @param equalityComparator + * The comparator to use for deciding whether to consider two items as equal or not. + * @return A subscription function for creating the target Observable. + */ public static OnSubscribeFunc distinctUntilChanged(Observable source, Comparator equalityComparator) { return new DistinctUntilChanged(source, Functions.identity(), equalityComparator); } @@ -51,6 +59,22 @@ public static OnSubscribeFunc distinctUntilChanged(Observable OnSubscribeFunc distinctUntilChanged(Observable source, Func1 keySelector, Comparator equalityComparator) { + return new DistinctUntilChanged(source, keySelector, equalityComparator); + } + + /** + * Returns an Observable that emits all sequentially distinct items emitted by the source. + * @param source + * The source Observable to emit the sequentially distinct items for. + * @param keySelector + * The function to select the key to use for the equality checks. * @return A subscription function for creating the target Observable. */ public static OnSubscribeFunc distinctUntilChanged(Observable source, Func1 keySelector) { @@ -142,6 +166,9 @@ public static class UnitTest { final Func1 TO_UPPER_WITH_EXCEPTION = new Func1() { @Override public String call(String s) { + if (s.equals("x")) { + return "xx"; + } return s.toUpperCase(); } }; @@ -254,5 +281,19 @@ public void testDistinctUntilChangedWithComparator() { inOrder.verify(w, never()).onNext(anyString()); verify(w, never()).onError(any(Throwable.class)); } + + @Test + public void testDistinctUntilChangedWithComparatorAndKeySelector() { + Observable src = from("a", "b", "x", "aa", "bb", "c", "ddd"); + create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w); + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("x"); + inOrder.verify(w, times(1)).onNext("c"); + inOrder.verify(w, times(1)).onNext("ddd"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + } } } From 5d40981e8491d0bf4b1e0d0208b255f388bb75e3 Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Fri, 13 Sep 2013 12:24:07 +0200 Subject: [PATCH 4/6] added comparator versions of distinct operation, too --- rxjava-core/src/main/java/rx/Observable.java | 33 ++++- .../java/rx/operators/OperationDistinct.java | 128 ++++++++++++++++++ 2 files changed, 159 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 0d8168a960e..85c0359b3f7 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2956,7 +2956,7 @@ public Observable distinctUntilChanged(Func1 keyS * @param equalityComparator * a comparator for deciding whether two emitted items are equal or not * @return an Observable of sequentially distinct items - * @see MSDN: Observable.distinctUntilChanged + * @see MSDN: Observable.distinctUntilChanged */ public Observable distinctUntilChanged(Comparator equalityComparator) { return create(OperationDistinctUntilChanged.distinctUntilChanged(this, equalityComparator)); @@ -2972,7 +2972,7 @@ public Observable distinctUntilChanged(Comparator equalityComparator) * @param equalityComparator * a comparator for deciding whether two emitted item keys are equal or not * @return an Observable of sequentially distinct items - * @see MSDN: Observable.distinctUntilChanged + * @see MSDN: Observable.distinctUntilChanged */ public Observable distinctUntilChanged(Func1 keySelector, Comparator equalityComparator) { return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector, equalityComparator)); @@ -2988,6 +2988,19 @@ public Observable distinct() { return create(OperationDistinct.distinct(this)); } + /** + * Returns an Observable that forwards all items emitted from the source Observable that are distinct according to + * a comparator. + * + * @param equalityComparator + * a comparator for deciding whether two emitted items are equal or not + * @return an Observable of distinct items + * @see MSDN: Observable.distinct + */ + public Observable distinct(Comparator equalityComparator) { + return create(OperationDistinct.distinct(this, equalityComparator)); + } + /** * Returns an Observable that forwards all items emitted from the source Observable that are distinct according to * a key selector function. @@ -3002,6 +3015,22 @@ public Observable distinct(Func1 keySelector) { return create(OperationDistinct.distinct(this, keySelector)); } + /** + * Returns an Observable that forwards all items emitted from the source Observable that are distinct according to + * a key selector function and a comparator. + * + * @param keySelector + * a function that projects an emitted item to a key value which is used for deciding whether an item is + * distinct from another one or not + * @param equalityComparator + * a comparator for deciding whether two emitted item keys are equal or not + * @return an Observable of distinct items + * @see MSDN: Observable.distinct + */ + public Observable distinct(Func1 keySelector, Comparator equalityComparator) { + return create(OperationDistinct.distinct(this, keySelector, equalityComparator)); + } + /** * Registers an {@link Action0} to be called when this Observable invokes {@link Observer#onCompleted onCompleted} or {@link Observer#onError onError}. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperationDistinct.java b/rxjava-core/src/main/java/rx/operators/OperationDistinct.java index b9cb0c6ebef..6c5d8208047 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDistinct.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDistinct.java @@ -22,7 +22,10 @@ import static rx.Observable.empty; import static rx.Observable.from; +import java.util.ArrayList; +import java.util.Comparator; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.junit.Before; @@ -57,6 +60,30 @@ public static OnSubscribeFunc distinct(Observable source, return new Distinct(source, keySelector); } + /** + * Returns an Observable that emits all distinct items emitted by the source + * @param source + * The source Observable to emit the distinct items for. + * @param equalityComparator + * The comparator to use for deciding whether to consider two items as equal or not. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc distinct(Observable source, Comparator equalityComparator) { + return new DistinctWithComparator(source, Functions.identity(), equalityComparator); + } + + /** + * Returns an Observable that emits all distinct items emitted by the source + * @param source + * The source Observable to emit the distinct items for. + * @param equalityComparator + * The comparator to use for deciding whether to consider the two item keys as equal or not. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc distinct(Observable source, Func1 keySelector, Comparator equalityComparator) { + return new DistinctWithComparator(source, keySelector, equalityComparator); + } + /** * Returns an Observable that emits all distinct items emitted by the source * @param source @@ -115,6 +142,67 @@ public void call() { } } + private static class DistinctWithComparator implements OnSubscribeFunc { + private final Observable source; + private final Func1 keySelector; + private final Comparator equalityComparator; + + private DistinctWithComparator(Observable source, Func1 keySelector, Comparator equalityComparator) { + this.source = source; + this.keySelector = keySelector; + this.equalityComparator = equalityComparator; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + final Subscription sourceSub = source.subscribe(new Observer() { + + // due to the totally arbitrary equality comparator, we can't use anything more efficient than lists here + private final List emittedKeys = new ArrayList(); + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(T next) { + try { + U nextKey = keySelector.call(next); + if (!alreadyEmitted(nextKey)) { + emittedKeys.add(nextKey); + observer.onNext(next); + } + } catch (Throwable t) { + // keySelector and comparator are user functions, may throw something + observer.onError(t); + } + } + + private boolean alreadyEmitted(U newKey) { + for (U key: emittedKeys) { + if (equalityComparator.compare(key, newKey) == 0) { + return true; + } + } + return false; + } + }); + + return Subscriptions.create(new Action0() { + @Override + public void call() { + sourceSub.unsubscribe(); + } + }); + } + } + public static class UnitTest { @Mock Observer w; @@ -123,10 +211,20 @@ public static class UnitTest { final Func1 TO_UPPER_WITH_EXCEPTION = new Func1() { @Override public String call(String s) { + if (s.equals("x")) { + return "XX"; + } return s.toUpperCase(); } }; + final Comparator COMPARE_LENGTH = new Comparator() { + @Override + public int compare(String s1, String s2) { + return s1.length() - s2.length(); + } + }; + @Before public void before() { initMocks(this); @@ -182,6 +280,36 @@ public void testDistinctOfNormalSourceWithKeySelector() { verify(w, never()).onError(any(Throwable.class)); } + @Test + public void testDistinctOfNormalSourceWithComparator() { + Observable src = from("1", "12", "123", "aaa", "321", "12", "21", "1", "12345"); + create(distinct(src, COMPARE_LENGTH)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("1"); + inOrder.verify(w, times(1)).onNext("12"); + inOrder.verify(w, times(1)).onNext("123"); + inOrder.verify(w, times(1)).onNext("12345"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testDistinctOfNormalSourceWithKeySelectorAndComparator() { + Observable src = from("a", "x", "ab", "abc", "cba", "de", "x", "a", "abcd"); + create(distinct(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("x"); + inOrder.verify(w, times(1)).onNext("abc"); + inOrder.verify(w, times(1)).onNext("abcd"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + } + @Test public void testDistinctOfSourceWithNulls() { Observable src = from(null, "a", "a", null, null, "b", null); From 6680dc319d627827c8eb31a5cbf5898257441964 Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Fri, 13 Sep 2013 12:31:00 +0200 Subject: [PATCH 5/6] added tests against multiple subscriptions, just to be on the safe side --- .../java/rx/operators/OperationDistinct.java | 27 +++++++++++++++++++ .../OperationDistinctUntilChanged.java | 26 ++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationDistinct.java b/rxjava-core/src/main/java/rx/operators/OperationDistinct.java index 6c5d8208047..740611c044a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDistinct.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDistinct.java @@ -206,6 +206,8 @@ public void call() { public static class UnitTest { @Mock Observer w; + @Mock + Observer w2; // nulls lead to exceptions final Func1 TO_UPPER_WITH_EXCEPTION = new Func1() { @@ -310,6 +312,31 @@ public void testDistinctOfNormalSourceWithKeySelectorAndComparator() { verify(w, never()).onError(any(Throwable.class)); } + @Test + public void testDistinctOfNormalSourceWithKeySelectorAndComparatorAndTwoSubscriptions() { + Observable src = from("a", "x", "ab", "abc", "cba", "de", "x", "a", "abcd"); + create(distinct(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("x"); + create(distinct(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w2); + inOrder.verify(w, times(1)).onNext("abc"); + inOrder.verify(w, times(1)).onNext("abcd"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + + InOrder inOrder2 = inOrder(w2); + inOrder2.verify(w2, times(1)).onNext("a"); + inOrder2.verify(w2, times(1)).onNext("x"); + inOrder2.verify(w2, times(1)).onNext("abc"); + inOrder2.verify(w2, times(1)).onNext("abcd"); + inOrder2.verify(w2, times(1)).onCompleted(); + inOrder2.verify(w2, never()).onNext(anyString()); + verify(w2, never()).onError(any(Throwable.class)); + } + @Test public void testDistinctOfSourceWithNulls() { Observable src = from(null, "a", "a", null, null, "b", null); diff --git a/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java b/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java index cd18ee7123b..a886445261d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java @@ -161,6 +161,8 @@ public void call() { public static class UnitTest { @Mock Observer w; + @Mock + Observer w2; // nulls lead to exceptions final Func1 TO_UPPER_WITH_EXCEPTION = new Func1() { @@ -295,5 +297,29 @@ public void testDistinctUntilChangedWithComparatorAndKeySelector() { inOrder.verify(w, never()).onNext(anyString()); verify(w, never()).onError(any(Throwable.class)); } + + @Test + public void testDistinctUntilChangedWithComparatorAndKeySelectorandTwoSubscriptions() { + Observable src = from("a", "b", "x", "aa", "bb", "c", "ddd"); + create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w); + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("x"); + create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w2); + inOrder.verify(w, times(1)).onNext("c"); + inOrder.verify(w, times(1)).onNext("ddd"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + + InOrder inOrder2 = inOrder(w2); + inOrder2.verify(w2, times(1)).onNext("a"); + inOrder2.verify(w2, times(1)).onNext("x"); + inOrder2.verify(w2, times(1)).onNext("c"); + inOrder2.verify(w2, times(1)).onNext("ddd"); + inOrder2.verify(w2, times(1)).onCompleted(); + inOrder2.verify(w2, never()).onNext(anyString()); + verify(w2, never()).onError(any(Throwable.class)); + } } } From 64df82d7fff0e55fcb8d36cef9d47ea4f70495df Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Fri, 13 Sep 2013 14:12:42 +0200 Subject: [PATCH 6/6] simplified error handling as this is already done on the outside --- .../java/rx/operators/OperationDistinct.java | 26 ++++++------------- .../OperationDistinctUntilChanged.java | 19 +++++--------- 2 files changed, 15 insertions(+), 30 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationDistinct.java b/rxjava-core/src/main/java/rx/operators/OperationDistinct.java index 740611c044a..b56f0cd22ff 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDistinct.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDistinct.java @@ -120,15 +120,10 @@ public void onError(Throwable e) { @Override public void onNext(T next) { - try { - U nextKey = keySelector.call(next); - if (!emittedKeys.contains(nextKey)) { - emittedKeys.add(nextKey); - observer.onNext(next); - } - } catch (Throwable t) { - // keySelector is a user function, may throw something - observer.onError(t); + U nextKey = keySelector.call(next); + if (!emittedKeys.contains(nextKey)) { + emittedKeys.add(nextKey); + observer.onNext(next); } } }); @@ -172,15 +167,10 @@ public void onError(Throwable e) { @Override public void onNext(T next) { - try { - U nextKey = keySelector.call(next); - if (!alreadyEmitted(nextKey)) { - emittedKeys.add(nextKey); - observer.onNext(next); - } - } catch (Throwable t) { - // keySelector and comparator are user functions, may throw something - observer.onError(t); + U nextKey = keySelector.call(next); + if (!alreadyEmitted(nextKey)) { + emittedKeys.add(nextKey); + observer.onNext(next); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java b/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java index a886445261d..c43edc02021 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java @@ -133,18 +133,13 @@ public void onError(Throwable e) { @Override public void onNext(T next) { U lastKey = lastEmittedKey; - try { - U nextKey = keySelector.call(next); - lastEmittedKey = nextKey; - if (!hasEmitted) { - hasEmitted = true; - observer.onNext(next); - } else if (equalityComparator.compare(lastKey, nextKey) != 0) { - observer.onNext(next); - } - } catch (Throwable t) { - // keySelector is a user function, may throw something - observer.onError(t); + U nextKey = keySelector.call(next); + lastEmittedKey = nextKey; + if (!hasEmitted) { + hasEmitted = true; + observer.onNext(next); + } else if (equalityComparator.compare(lastKey, nextKey) != 0) { + observer.onNext(next); } } });