Skip to content

Commit

Permalink
Merge pull request ReactiveX#380 from jmhofer/distinct-with-comparator
Browse files Browse the repository at this point in the history
Implemented `distinct` and `distinctUntilChanged` variants using a comparator
  • Loading branch information
benjchristensen committed Sep 13, 2013
2 parents a73c6d9 + 64df82d commit 6e03af3
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 31 deletions.
59 changes: 59 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -2948,6 +2949,35 @@ public <U> Observable<T> distinctUntilChanged(Func1<? super T, ? extends U> keyS
return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector));
}

/**
* Returns an Observable that forwards all items emitted from the source Observable that are sequentially distinct according to
* a comparator.
*
* @param equalityComparator
* a comparator for deciding whether two emitted items are equal or not
* @return an Observable of sequentially distinct items
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229776%28v=vs.103%29.aspx">MSDN: Observable.distinctUntilChanged</a>
*/
public <U> Observable<T> distinctUntilChanged(Comparator<T> equalityComparator) {
return create(OperationDistinctUntilChanged.distinctUntilChanged(this, equalityComparator));
}

/**
* Returns an Observable that forwards all items emitted from the source Observable that are sequentially distinct according to
* a key selector function and a comparator.
*
* @param keySelector
* a function that projects an emitted item to a key value which is used for deciding whether an item is sequentially
* distinct from another one or not
* @param equalityComparator
* a comparator for deciding whether two emitted item keys are equal or not
* @return an Observable of sequentially distinct items
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229533%28v=vs.103%29.aspx">MSDN: Observable.distinctUntilChanged</a>
*/
public <U> Observable<T> distinctUntilChanged(Func1<? super T, ? extends U> keySelector, Comparator<U> equalityComparator) {
return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector, equalityComparator));
}

/**
* Returns an Observable that forwards all distinct items emitted from the source Observable.
*
Expand All @@ -2958,6 +2988,19 @@ public Observable<T> distinct() {
return create(OperationDistinct.distinct(this));
}

/**
* Returns an Observable that forwards all items emitted from the source Observable that are distinct according to
* a comparator.
*
* @param equalityComparator
* a comparator for deciding whether two emitted items are equal or not
* @return an Observable of distinct items
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211630(v=vs.103).aspx">MSDN: Observable.distinct</a>
*/
public <U> Observable<T> distinct(Comparator<T> equalityComparator) {
return create(OperationDistinct.distinct(this, equalityComparator));
}

/**
* Returns an Observable that forwards all items emitted from the source Observable that are distinct according to
* a key selector function.
Expand All @@ -2972,6 +3015,22 @@ public <U> Observable<T> distinct(Func1<? super T, ? extends U> keySelector) {
return create(OperationDistinct.distinct(this, keySelector));
}

/**
* Returns an Observable that forwards all items emitted from the source Observable that are distinct according to
* a key selector function and a comparator.
*
* @param keySelector
* a function that projects an emitted item to a key value which is used for deciding whether an item is
* distinct from another one or not
* @param equalityComparator
* a comparator for deciding whether two emitted item keys are equal or not
* @return an Observable of distinct items
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229050(v=vs.103).aspx">MSDN: Observable.distinct</a>
*/
public <U> Observable<T> distinct(Func1<? super T, ? extends U> keySelector, Comparator<U> equalityComparator) {
return create(OperationDistinct.distinct(this, keySelector, equalityComparator));
}

/**
* Registers an {@link Action0} to be called when this Observable invokes {@link Observer#onCompleted onCompleted} or {@link Observer#onError onError}.
* <p>
Expand Down
161 changes: 153 additions & 8 deletions rxjava-core/src/main/java/rx/operators/OperationDistinct.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import static rx.Observable.empty;
import static rx.Observable.from;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.junit.Before;
Expand Down Expand Up @@ -57,6 +60,30 @@ public static <T, U> OnSubscribeFunc<T> distinct(Observable<? extends T> source,
return new Distinct<T, U>(source, keySelector);
}

/**
* Returns an Observable that emits all distinct items emitted by the source
* @param source
* The source Observable to emit the distinct items for.
* @param equalityComparator
* The comparator to use for deciding whether to consider two items as equal or not.
* @return A subscription function for creating the target Observable.
*/
public static <T> OnSubscribeFunc<T> distinct(Observable<? extends T> source, Comparator<T> equalityComparator) {
return new DistinctWithComparator<T, T>(source, Functions.<T>identity(), equalityComparator);
}

/**
* Returns an Observable that emits all distinct items emitted by the source
* @param source
* The source Observable to emit the distinct items for.
* @param equalityComparator
* The comparator to use for deciding whether to consider the two item keys as equal or not.
* @return A subscription function for creating the target Observable.
*/
public static <T, U> OnSubscribeFunc<T> distinct(Observable<? extends T> source, Func1<? super T, ? extends U> keySelector, Comparator<U> equalityComparator) {
return new DistinctWithComparator<T, U>(source, keySelector, equalityComparator);
}

/**
* Returns an Observable that emits all distinct items emitted by the source
* @param source
Expand Down Expand Up @@ -93,16 +120,67 @@ public void onError(Throwable e) {

@Override
public void onNext(T next) {
try {
U nextKey = keySelector.call(next);
if (!emittedKeys.contains(nextKey)) {
emittedKeys.add(nextKey);
observer.onNext(next);
U nextKey = keySelector.call(next);
if (!emittedKeys.contains(nextKey)) {
emittedKeys.add(nextKey);
observer.onNext(next);
}
}
});

return Subscriptions.create(new Action0() {
@Override
public void call() {
sourceSub.unsubscribe();
}
});
}
}

private static class DistinctWithComparator<T, U> implements OnSubscribeFunc<T> {
private final Observable<? extends T> source;
private final Func1<? super T, ? extends U> keySelector;
private final Comparator<U> equalityComparator;

private DistinctWithComparator(Observable<? extends T> source, Func1<? super T, ? extends U> keySelector, Comparator<U> equalityComparator) {
this.source = source;
this.keySelector = keySelector;
this.equalityComparator = equalityComparator;
}

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
final Subscription sourceSub = source.subscribe(new Observer<T>() {

// due to the totally arbitrary equality comparator, we can't use anything more efficient than lists here
private final List<U> emittedKeys = new ArrayList<U>();

@Override
public void onCompleted() {
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onNext(T next) {
U nextKey = keySelector.call(next);
if (!alreadyEmitted(nextKey)) {
emittedKeys.add(nextKey);
observer.onNext(next);
}
}

private boolean alreadyEmitted(U newKey) {
for (U key: emittedKeys) {
if (equalityComparator.compare(key, newKey) == 0) {
return true;
}
} catch (Throwable t) {
// keySelector is a user function, may throw something
observer.onError(t);
}
return false;
}
});

Expand All @@ -118,15 +196,27 @@ public void call() {
public static class UnitTest {
@Mock
Observer<? super String> w;
@Mock
Observer<? super String> w2;

// nulls lead to exceptions
final Func1<String, String> TO_UPPER_WITH_EXCEPTION = new Func1<String, String>() {
@Override
public String call(String s) {
if (s.equals("x")) {
return "XX";
}
return s.toUpperCase();
}
};

final Comparator<String> COMPARE_LENGTH = new Comparator<String>() {
@Override
public int compare(String s1, String s2) {
return s1.length() - s2.length();
}
};

@Before
public void before() {
initMocks(this);
Expand Down Expand Up @@ -182,6 +272,61 @@ public void testDistinctOfNormalSourceWithKeySelector() {
verify(w, never()).onError(any(Throwable.class));
}

@Test
public void testDistinctOfNormalSourceWithComparator() {
Observable<String> src = from("1", "12", "123", "aaa", "321", "12", "21", "1", "12345");
create(distinct(src, COMPARE_LENGTH)).subscribe(w);

InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("1");
inOrder.verify(w, times(1)).onNext("12");
inOrder.verify(w, times(1)).onNext("123");
inOrder.verify(w, times(1)).onNext("12345");
inOrder.verify(w, times(1)).onCompleted();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
}

@Test
public void testDistinctOfNormalSourceWithKeySelectorAndComparator() {
Observable<String> src = from("a", "x", "ab", "abc", "cba", "de", "x", "a", "abcd");
create(distinct(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w);

InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("x");
inOrder.verify(w, times(1)).onNext("abc");
inOrder.verify(w, times(1)).onNext("abcd");
inOrder.verify(w, times(1)).onCompleted();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
}

@Test
public void testDistinctOfNormalSourceWithKeySelectorAndComparatorAndTwoSubscriptions() {
Observable<String> src = from("a", "x", "ab", "abc", "cba", "de", "x", "a", "abcd");
create(distinct(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w);

InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("x");
create(distinct(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w2);
inOrder.verify(w, times(1)).onNext("abc");
inOrder.verify(w, times(1)).onNext("abcd");
inOrder.verify(w, times(1)).onCompleted();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));

InOrder inOrder2 = inOrder(w2);
inOrder2.verify(w2, times(1)).onNext("a");
inOrder2.verify(w2, times(1)).onNext("x");
inOrder2.verify(w2, times(1)).onNext("abc");
inOrder2.verify(w2, times(1)).onNext("abcd");
inOrder2.verify(w2, times(1)).onCompleted();
inOrder2.verify(w2, never()).onNext(anyString());
verify(w2, never()).onError(any(Throwable.class));
}

@Test
public void testDistinctOfSourceWithNulls() {
Observable<String> src = from(null, "a", "a", null, null, "b", null);
Expand Down
Loading

0 comments on commit 6e03af3

Please sign in to comment.