Skip to content

Commit

Permalink
Merge pull request #2948 from ReactiveX/TestSubscriberPlus
Browse files Browse the repository at this point in the history
More assertions for TestSubscriber
  • Loading branch information
benjchristensen committed May 19, 2015
2 parents 1a85656 + ff5001b commit 450d9a8
Showing 1 changed file with 180 additions and 22 deletions.
202 changes: 180 additions & 22 deletions src/main/java/rx/observers/TestSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
*/
package rx.observers;

import java.util.List;
import java.util.*;
import java.util.concurrent.*;

import rx.*;
import rx.Observer;
import rx.annotations.Experimental;
import rx.exceptions.CompositeException;

/**
* A {@code TestSubscriber} is a variety of {@link Subscriber} that you can use for unit testing, to perform
Expand All @@ -29,34 +32,72 @@ public class TestSubscriber<T> extends Subscriber<T> {
private final TestObserver<T> testObserver;
private final CountDownLatch latch = new CountDownLatch(1);
private volatile Thread lastSeenThread;
/** Holds the initial request value. */
private final long initialRequest;
/** The shared no-op observer. */
private static final Observer<Object> INERT = new Observer<Object>() {

public TestSubscriber(Subscriber<T> delegate) {
@Override
public void onCompleted() {
// do nothing
}

@Override
public void onError(Throwable e) {
// do nothing
}

@Override
public void onNext(Object t) {
// do nothing
}

};

/**
* Constructs a TestSubscriber with the initial request to be requested from upstream.
* @param initialRequest the initial request value, negative value will revert to the default unbounded behavior
*/
@SuppressWarnings("unchecked")
@Experimental
public TestSubscriber(long initialRequest) {
this((Observer<T>)INERT, initialRequest);
}

/**
* Constructs a TestSubscriber with the initial request to be requested from upstream
* and a delegate Observer to wrap.
* @param initialRequest the initial request value, negative value will revert to the default unbounded behavior
* @param delegate the Observer instance to wrap
*/
@Experimental
public TestSubscriber(Observer<T> delegate, long initialRequest) {
if (delegate == null) {
throw new NullPointerException();
}
this.testObserver = new TestObserver<T>(delegate);
this.initialRequest = initialRequest;
}

public TestSubscriber(Subscriber<T> delegate) {
this(delegate, -1);
}

public TestSubscriber(Observer<T> delegate) {
this.testObserver = new TestObserver<T>(delegate);
this(delegate, -1);
}

public TestSubscriber() {
this.testObserver = new TestObserver<T>(new Observer<T>() {

@Override
public void onCompleted() {
// do nothing
}

@Override
public void onError(Throwable e) {
// do nothing
}

@Override
public void onNext(T t) {
// do nothing
}

});
this(-1);
}

@Override
public void onStart() {
if (initialRequest >= 0) {
requestMore(initialRequest);
} else {
super.onStart();
}
}

/**
Expand Down Expand Up @@ -261,4 +302,121 @@ public void awaitTerminalEventAndUnsubscribeOnTimeout(long timeout, TimeUnit uni
public Thread getLastSeenThread() {
return lastSeenThread;
}
}

/**
* Assert if there is exactly a single completion event.
*/
@Experimental
public void assertCompleted() {
int s = testObserver.getOnCompletedEvents().size();
if (s == 0) {
throw new AssertionError("Not completed!");
} else
if (s > 1) {
throw new AssertionError("Completed multiple times: " + s);
}
}
/**
* Assert if there is no completion event.
*/
@Experimental
public void assertNotCompleted() {
int s = testObserver.getOnCompletedEvents().size();
if (s == 1) {
throw new AssertionError("Completed!");
} else
if (s > 1) {
throw new AssertionError("Completed multiple times: " + s);
}
}
/**
* Assert if there is exactly one error event which is a subclass of the given class.
* @param clazz the class to check the error against.
*/
@Experimental
public void assertError(Class<? extends Throwable> clazz) {
List<Throwable> err = testObserver.getOnErrorEvents();
if (err.size() == 0) {
throw new AssertionError("No errors");
} else
if (err.size() > 1) {
throw new AssertionError("Multiple errors: " + err.size(), new CompositeException(err));
} else
if (!clazz.isInstance(err.get(0))) {
throw new AssertionError("Exceptions differ; expected: " + clazz + ", actual: " + err.get(0), err.get(0));
}
}
/**
* Assert there is a single onError event with the exact exception.
* @param throwable the throwable to check
*/
@Experimental
public void assertError(Throwable throwable) {
List<Throwable> err = testObserver.getOnErrorEvents();
if (err.size() == 0) {
throw new AssertionError("No errors");
} else
if (err.size() > 1) {
throw new AssertionError("Multiple errors: " + err.size(), new CompositeException(err));
} else
if (throwable.equals(err.get(0))) {
throw new AssertionError("Exceptions differ; expected: " + throwable + ", actual: " + err.get(0), err.get(0));
}
}
/**
* Assert for no onError and onCompleted events.
*/
@Experimental
public void assertNoTerminalEvent() {
List<Throwable> err = testObserver.getOnErrorEvents();
int s = testObserver.getOnCompletedEvents().size();
if (err.size() > 0 || s > 0) {
if (err.isEmpty()) {
throw new AssertionError("Found " + err.size() + " errors and " + s + " completion events instead of none");
} else
if (err.size() == 1) {
throw new AssertionError("Found " + err.size() + " errors and " + s + " completion events instead of none", err.get(0));
} else {
throw new AssertionError("Found " + err.size() + " errors and " + s + " completion events instead of none", new CompositeException(err));
}
}
}
/**
* Assert if there are no onNext events received.
*/
@Experimental
public void assertNoValues() {
int s = testObserver.getOnNextEvents().size();
if (s > 0) {
throw new AssertionError("No onNext events expected yet some received: " + s);
}
}
/**
* Assert if the given number of onNext events are received.
* @param count the expected number of onNext events
*/
@Experimental
public void assertValueCount(int count) {
int s = testObserver.getOnNextEvents().size();
if (s != count) {
throw new AssertionError("Number of onNext events differ; expected: " + count + ", actual: " + s);
}
}

/**
* Assert if the received onNext events, in order, are the specified values.
* @param values the values to check
*/
@Experimental
public void assertValues(T... values) {

This comment has been minimized.

Copy link
@benjchristensen

benjchristensen May 19, 2015

Author Member

I merged this but now realize that this is a different naming convention than the assertReceivedOnNext that it is a cover method for. I understand that it's a shorter name, but now we have two names for the same thing and that hinders discoverability and understanding. The obvious question will then always be "what's the difference?".

I suggest we keep the existing naming as we can't remove assertReceivedOnNext.

assertReceivedOnNext(Arrays.asList(values));
}
/**
* Assert if there is only a single received onNext event.
* @param values the values to check
*/
@Experimental
public void assertValue(T value) {
assertReceivedOnNext(Collections.singletonList(value));
}
}

0 comments on commit 450d9a8

Please sign in to comment.