diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index c0ecf6226ff..85c0359b3f7 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -2948,6 +2949,35 @@ public Observable distinctUntilChanged(Func1 keyS return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector)); } + /** + * Returns an Observable that forwards all items emitted from the source Observable that are sequentially distinct according to + * a comparator. + * + * @param equalityComparator + * a comparator for deciding whether two emitted items are equal or not + * @return an Observable of sequentially distinct items + * @see MSDN: Observable.distinctUntilChanged + */ + public Observable distinctUntilChanged(Comparator equalityComparator) { + return create(OperationDistinctUntilChanged.distinctUntilChanged(this, equalityComparator)); + } + + /** + * Returns an Observable that forwards all items emitted from the source Observable that are sequentially distinct according to + * a key selector function and a comparator. + * + * @param keySelector + * a function that projects an emitted item to a key value which is used for deciding whether an item is sequentially + * distinct from another one or not + * @param equalityComparator + * a comparator for deciding whether two emitted item keys are equal or not + * @return an Observable of sequentially distinct items + * @see MSDN: Observable.distinctUntilChanged + */ + public Observable distinctUntilChanged(Func1 keySelector, Comparator equalityComparator) { + return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector, equalityComparator)); + } + /** * Returns an Observable that forwards all distinct items emitted from the source Observable. * @@ -2958,6 +2988,19 @@ public Observable distinct() { return create(OperationDistinct.distinct(this)); } + /** + * Returns an Observable that forwards all items emitted from the source Observable that are distinct according to + * a comparator. + * + * @param equalityComparator + * a comparator for deciding whether two emitted items are equal or not + * @return an Observable of distinct items + * @see MSDN: Observable.distinct + */ + public Observable distinct(Comparator equalityComparator) { + return create(OperationDistinct.distinct(this, equalityComparator)); + } + /** * Returns an Observable that forwards all items emitted from the source Observable that are distinct according to * a key selector function. @@ -2972,6 +3015,22 @@ public Observable distinct(Func1 keySelector) { return create(OperationDistinct.distinct(this, keySelector)); } + /** + * Returns an Observable that forwards all items emitted from the source Observable that are distinct according to + * a key selector function and a comparator. + * + * @param keySelector + * a function that projects an emitted item to a key value which is used for deciding whether an item is + * distinct from another one or not + * @param equalityComparator + * a comparator for deciding whether two emitted item keys are equal or not + * @return an Observable of distinct items + * @see MSDN: Observable.distinct + */ + public Observable distinct(Func1 keySelector, Comparator equalityComparator) { + return create(OperationDistinct.distinct(this, keySelector, equalityComparator)); + } + /** * Registers an {@link Action0} to be called when this Observable invokes {@link Observer#onCompleted onCompleted} or {@link Observer#onError onError}. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperationDistinct.java b/rxjava-core/src/main/java/rx/operators/OperationDistinct.java index b9cb0c6ebef..b56f0cd22ff 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDistinct.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDistinct.java @@ -22,7 +22,10 @@ import static rx.Observable.empty; import static rx.Observable.from; +import java.util.ArrayList; +import java.util.Comparator; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.junit.Before; @@ -57,6 +60,30 @@ public static OnSubscribeFunc distinct(Observable source, return new Distinct(source, keySelector); } + /** + * Returns an Observable that emits all distinct items emitted by the source + * @param source + * The source Observable to emit the distinct items for. + * @param equalityComparator + * The comparator to use for deciding whether to consider two items as equal or not. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc distinct(Observable source, Comparator equalityComparator) { + return new DistinctWithComparator(source, Functions.identity(), equalityComparator); + } + + /** + * Returns an Observable that emits all distinct items emitted by the source + * @param source + * The source Observable to emit the distinct items for. + * @param equalityComparator + * The comparator to use for deciding whether to consider the two item keys as equal or not. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc distinct(Observable source, Func1 keySelector, Comparator equalityComparator) { + return new DistinctWithComparator(source, keySelector, equalityComparator); + } + /** * Returns an Observable that emits all distinct items emitted by the source * @param source @@ -93,16 +120,67 @@ public void onError(Throwable e) { @Override public void onNext(T next) { - try { - U nextKey = keySelector.call(next); - if (!emittedKeys.contains(nextKey)) { - emittedKeys.add(nextKey); - observer.onNext(next); + U nextKey = keySelector.call(next); + if (!emittedKeys.contains(nextKey)) { + emittedKeys.add(nextKey); + observer.onNext(next); + } + } + }); + + return Subscriptions.create(new Action0() { + @Override + public void call() { + sourceSub.unsubscribe(); + } + }); + } + } + + private static class DistinctWithComparator implements OnSubscribeFunc { + private final Observable source; + private final Func1 keySelector; + private final Comparator equalityComparator; + + private DistinctWithComparator(Observable source, Func1 keySelector, Comparator equalityComparator) { + this.source = source; + this.keySelector = keySelector; + this.equalityComparator = equalityComparator; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + final Subscription sourceSub = source.subscribe(new Observer() { + + // due to the totally arbitrary equality comparator, we can't use anything more efficient than lists here + private final List emittedKeys = new ArrayList(); + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(T next) { + U nextKey = keySelector.call(next); + if (!alreadyEmitted(nextKey)) { + emittedKeys.add(nextKey); + observer.onNext(next); + } + } + + private boolean alreadyEmitted(U newKey) { + for (U key: emittedKeys) { + if (equalityComparator.compare(key, newKey) == 0) { + return true; } - } catch (Throwable t) { - // keySelector is a user function, may throw something - observer.onError(t); } + return false; } }); @@ -118,15 +196,27 @@ public void call() { public static class UnitTest { @Mock Observer w; + @Mock + Observer w2; // nulls lead to exceptions final Func1 TO_UPPER_WITH_EXCEPTION = new Func1() { @Override public String call(String s) { + if (s.equals("x")) { + return "XX"; + } return s.toUpperCase(); } }; + final Comparator COMPARE_LENGTH = new Comparator() { + @Override + public int compare(String s1, String s2) { + return s1.length() - s2.length(); + } + }; + @Before public void before() { initMocks(this); @@ -182,6 +272,61 @@ public void testDistinctOfNormalSourceWithKeySelector() { verify(w, never()).onError(any(Throwable.class)); } + @Test + public void testDistinctOfNormalSourceWithComparator() { + Observable src = from("1", "12", "123", "aaa", "321", "12", "21", "1", "12345"); + create(distinct(src, COMPARE_LENGTH)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("1"); + inOrder.verify(w, times(1)).onNext("12"); + inOrder.verify(w, times(1)).onNext("123"); + inOrder.verify(w, times(1)).onNext("12345"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testDistinctOfNormalSourceWithKeySelectorAndComparator() { + Observable src = from("a", "x", "ab", "abc", "cba", "de", "x", "a", "abcd"); + create(distinct(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("x"); + inOrder.verify(w, times(1)).onNext("abc"); + inOrder.verify(w, times(1)).onNext("abcd"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testDistinctOfNormalSourceWithKeySelectorAndComparatorAndTwoSubscriptions() { + Observable src = from("a", "x", "ab", "abc", "cba", "de", "x", "a", "abcd"); + create(distinct(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("x"); + create(distinct(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w2); + inOrder.verify(w, times(1)).onNext("abc"); + inOrder.verify(w, times(1)).onNext("abcd"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + + InOrder inOrder2 = inOrder(w2); + inOrder2.verify(w2, times(1)).onNext("a"); + inOrder2.verify(w2, times(1)).onNext("x"); + inOrder2.verify(w2, times(1)).onNext("abc"); + inOrder2.verify(w2, times(1)).onNext("abcd"); + inOrder2.verify(w2, times(1)).onCompleted(); + inOrder2.verify(w2, never()).onNext(anyString()); + verify(w2, never()).onError(any(Throwable.class)); + } + @Test public void testDistinctOfSourceWithNulls() { Observable src = from(null, "a", "a", null, null, "b", null); diff --git a/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java b/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java index 25c09a5f90d..c43edc02021 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java @@ -22,6 +22,8 @@ import static rx.Observable.empty; import static rx.Observable.from; +import java.util.Comparator; + import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; @@ -45,10 +47,38 @@ public final class OperationDistinctUntilChanged { * Returns an Observable that emits all sequentially distinct items emitted by the source. * @param source * The source Observable to emit the sequentially distinct items for. + * @param equalityComparator + * The comparator to use for deciding whether to consider two items as equal or not. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc distinctUntilChanged(Observable source, Comparator equalityComparator) { + return new DistinctUntilChanged(source, Functions.identity(), equalityComparator); + } + + /** + * Returns an Observable that emits all sequentially distinct items emitted by the source. + * @param source + * The source Observable to emit the sequentially distinct items for. + * @param keySelector + * The function to select the key to use for the equality checks. + * @param equalityComparator + * The comparator to use for deciding whether to consider the two item keys as equal or not. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc distinctUntilChanged(Observable source, Func1 keySelector, Comparator equalityComparator) { + return new DistinctUntilChanged(source, keySelector, equalityComparator); + } + + /** + * Returns an Observable that emits all sequentially distinct items emitted by the source. + * @param source + * The source Observable to emit the sequentially distinct items for. + * @param keySelector + * The function to select the key to use for the equality checks. * @return A subscription function for creating the target Observable. */ public static OnSubscribeFunc distinctUntilChanged(Observable source, Func1 keySelector) { - return new DistinctUntilChanged(source, keySelector); + return new DistinctUntilChanged(source, keySelector, new DefaultEqualityComparator()); } /** @@ -58,16 +88,30 @@ public static OnSubscribeFunc distinctUntilChanged(Observable OnSubscribeFunc distinctUntilChanged(Observable source) { - return new DistinctUntilChanged(source, Functions.identity()); + return new DistinctUntilChanged(source, Functions.identity(), new DefaultEqualityComparator()); + } + + // does not define a useful ordering; it's only used for equality tests here + private static class DefaultEqualityComparator implements Comparator { + @Override + public int compare(T t1, T t2) { + if (t1 == null) { + return t2 == null ? 0 : 1; + } else { + return t1.equals(t2) ? 0 : 1; + } + } } private static class DistinctUntilChanged implements OnSubscribeFunc { private final Observable source; private final Func1 keySelector; + private final Comparator equalityComparator; - private DistinctUntilChanged(Observable source, Func1 keySelector) { + private DistinctUntilChanged(Observable source, Func1 keySelector, Comparator equalityComparator) { this.source = source; this.keySelector = keySelector; + this.equalityComparator = equalityComparator; } @Override @@ -89,26 +133,13 @@ public void onError(Throwable e) { @Override public void onNext(T next) { U lastKey = lastEmittedKey; - try { - U nextKey = keySelector.call(next); - lastEmittedKey = nextKey; - if (!hasEmitted) { - hasEmitted = true; - observer.onNext(next); - } else { - if (lastKey == null) { - if (nextKey != null) { - observer.onNext(next); - } - } else { - if (!lastKey.equals(nextKey)) { - observer.onNext(next); - } - } - } - } catch (Throwable t) { - // keySelector is a user function, may throw something - observer.onError(t); + U nextKey = keySelector.call(next); + lastEmittedKey = nextKey; + if (!hasEmitted) { + hasEmitted = true; + observer.onNext(next); + } else if (equalityComparator.compare(lastKey, nextKey) != 0) { + observer.onNext(next); } } }); @@ -125,15 +156,27 @@ public void call() { public static class UnitTest { @Mock Observer w; + @Mock + Observer w2; // nulls lead to exceptions final Func1 TO_UPPER_WITH_EXCEPTION = new Func1() { @Override public String call(String s) { + if (s.equals("x")) { + return "xx"; + } return s.toUpperCase(); } }; + final Comparator COMPARE_LENGTH = new Comparator() { + @Override + public int compare(String s1, String s2) { + return s1.length() - s2.length(); + } + }; + @Before public void before() { initMocks(this); @@ -221,5 +264,57 @@ public void testDistinctUntilChangedOfSourceWithExceptionsFromKeySelector() { inOrder.verify(w, never()).onNext(anyString()); inOrder.verify(w, never()).onCompleted(); } + + @Test + public void testDistinctUntilChangedWithComparator() { + Observable src = from("a", "b", "c", "aa", "bb", "c", "ddd"); + create(distinctUntilChanged(src, COMPARE_LENGTH)).subscribe(w); + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("aa"); + inOrder.verify(w, times(1)).onNext("c"); + inOrder.verify(w, times(1)).onNext("ddd"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testDistinctUntilChangedWithComparatorAndKeySelector() { + Observable src = from("a", "b", "x", "aa", "bb", "c", "ddd"); + create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w); + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("x"); + inOrder.verify(w, times(1)).onNext("c"); + inOrder.verify(w, times(1)).onNext("ddd"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testDistinctUntilChangedWithComparatorAndKeySelectorandTwoSubscriptions() { + Observable src = from("a", "b", "x", "aa", "bb", "c", "ddd"); + create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w); + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("x"); + create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w2); + inOrder.verify(w, times(1)).onNext("c"); + inOrder.verify(w, times(1)).onNext("ddd"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + + InOrder inOrder2 = inOrder(w2); + inOrder2.verify(w2, times(1)).onNext("a"); + inOrder2.verify(w2, times(1)).onNext("x"); + inOrder2.verify(w2, times(1)).onNext("c"); + inOrder2.verify(w2, times(1)).onNext("ddd"); + inOrder2.verify(w2, times(1)).onCompleted(); + inOrder2.verify(w2, never()).onNext(anyString()); + verify(w2, never()).onError(any(Throwable.class)); + } } }