Skip to content

Commit

Permalink
Small reorganization of code for OperationTake and TrustedObservableT…
Browse files Browse the repository at this point in the history
…ester

- removed rx.testing package (if that's going to exist that means it's bleeding into something that should live in /src/test and beyond what works well for inner class testing)
- made TrustedObservableTester part of rx.operation package and an inner UnitTest class so it doesn't become part of the public API
  • Loading branch information
benjchristensen committed Mar 31, 2013
1 parent 54fcb94 commit 5f852fd
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,75 +1,79 @@
package rx.testing;
package rx.operators;

import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;
import static org.junit.Assert.*;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;

public class TrustedObservableTester
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

/**
* Common utility functions for operator implementations and tests.
*/
/* package */class AbstractOperation
{
private TrustedObservableTester() {}
private AbstractOperation() {
}

public static <T> Func1<Observer<T>, Subscription> assertTrustedObservable(final Func1<Observer<T>, Subscription> source)
{
return new Func1<Observer<T>, Subscription>()
public static class UnitTest {

public static <T> Func1<Observer<T>, Subscription> assertTrustedObservable(final Func1<Observer<T>, Subscription> source)
{
@Override
public Subscription call(Observer<T> observer)
return new Func1<Observer<T>, Subscription>()
{
return source.call(new TestingObserver<T>(observer));
}
};
}
@Override
public Subscription call(Observer<T> observer)
{
return source.call(new TestingObserver<T>(observer));
}
};
}

public static class TestingObserver<T> implements Observer<T> {
public static class TestingObserver<T> implements Observer<T> {

private final Observer<T> actual;
private final AtomicBoolean isFinished = new AtomicBoolean(false);
private final AtomicBoolean isInCallback = new AtomicBoolean(false);
private final Observer<T> actual;
private final AtomicBoolean isFinished = new AtomicBoolean(false);
private final AtomicBoolean isInCallback = new AtomicBoolean(false);

public TestingObserver(Observer<T> actual) {
this.actual = actual;
}
public TestingObserver(Observer<T> actual) {
this.actual = actual;
}

@Override
public void onCompleted() {
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
actual.onCompleted();
isInCallback.set(false);
}
@Override
public void onCompleted() {
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
actual.onCompleted();
isInCallback.set(false);
}

@Override
public void onError(Exception e) {
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
actual.onError(e);
isInCallback.set(false);
}
@Override
public void onError(Exception e) {
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
actual.onError(e);
isInCallback.set(false);
}

@Override
public void onNext(T args) {
assertFalse("previous call to onCompleted() or onError()", isFinished.get());
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
actual.onNext(args);
isInCallback.set(false);
}
@Override
public void onNext(T args) {
assertFalse("previous call to onCompleted() or onError()", isFinished.get());
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
actual.onNext(args);
isInCallback.set(false);
}

}
}

public static class UnitTest {
@Test(expected = AssertionError.class)
public void testDoubleCompleted() {
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
Expand Down Expand Up @@ -141,7 +145,6 @@ public Subscription call(Observer<String> observer)
})).lastOrDefault("end");
}


@Test(expected = AssertionError.class)
public void testErrorNext() {
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
Expand Down Expand Up @@ -250,4 +253,4 @@ public void onNext(String args)
}
}
}
}
}
21 changes: 9 additions & 12 deletions rxjava-core/src/main/java/rx/operators/OperationTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,23 @@
*/
package rx.operators;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static rx.operators.AbstractOperation.UnitTest.*;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Func1;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static rx.testing.TrustedObservableTester.assertTrustedObservable;

/**
* Returns a specified number of contiguous values from the start of an observable sequence.
*/
Expand Down

0 comments on commit 5f852fd

Please sign in to comment.