Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes: Scheduler and Merge #437

Merged
merged 2 commits into from
Oct 16, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 60 additions & 22 deletions rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import org.mockito.Mockito;

import rx.concurrency.TestScheduler;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

Expand Down Expand Up @@ -83,23 +86,23 @@ public abstract class Scheduler {
* Schedules a cancelable action to be executed periodically.
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
*
* @param state
*
* @param state
* State to pass into the action.
* @param action
* @param action
* The action to execute periodically.
* @param initialDelay
* @param initialDelay
* Time to wait before executing the action for the first time.
* @param period
* @param period
* The time interval to wait each time in between executing the action.
* @param unit
* @param unit
* The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
final long periodInNanos = unit.toNanos(period);
final AtomicBoolean complete = new AtomicBoolean();

final Func2<Scheduler, T, Subscription> recursiveAction = new Func2<Scheduler, T, Subscription>() {
@Override
public Subscription call(Scheduler scheduler, T state0) {
Expand Down Expand Up @@ -128,7 +131,7 @@ public void call() {
}
});
}

/**
* Schedules a cancelable action to be executed at dueTime.
*
Expand All @@ -150,6 +153,40 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?
}
}

/**
* Schedules an action and receives back an action for recursive execution.
*
* @param action
* action
* @return a subscription to be able to unsubscribe from action.
*/
public Subscription schedule(final Action1<Action0> action) {
final CompositeSubscription parentSubscription = new CompositeSubscription();
final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription();
parentSubscription.add(childSubscription);

final Func2<Scheduler, Func2, Subscription> parentAction = new Func2<Scheduler, Func2, Subscription>() {

@Override
public Subscription call(final Scheduler scheduler, final Func2 parentAction) {
action.call(new Action0() {

@Override
public void call() {
if (!parentSubscription.isUnsubscribed()) {
childSubscription.setSubscription(scheduler.schedule(parentAction, parentAction));
}
}

});
return childSubscription;
}
};

parentSubscription.add(schedule(parentAction, parentAction));

return parentSubscription;
}

/**
* Schedules an action to be executed.
Expand Down Expand Up @@ -187,17 +224,16 @@ public Subscription call(Scheduler scheduler, Void state) {
}, delayTime, unit);
}


/**
* Schedules an action to be executed periodically.
*
* @param action
* @param action
* The action to execute periodically.
* @param initialDelay
* @param initialDelay
* Time to wait before executing the action for the first time.
* @param period
* @param period
* The time interval to wait each time in between executing the action.
* @param unit
* @param unit
* The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
Expand Down Expand Up @@ -230,39 +266,41 @@ public int degreeOfParallelism() {
}

public static class UnitTest {
@SuppressWarnings("unchecked") // mocking is unchecked, unfortunately
@SuppressWarnings("unchecked")
// mocking is unchecked, unfortunately
@Test
public void testPeriodicScheduling() {
final Func1<Long, Void> calledOp = mock(Func1.class);

final TestScheduler scheduler = new TestScheduler();
Subscription subscription = scheduler.schedulePeriodically(new Action0() {
@Override public void call() {
@Override
public void call() {
System.out.println(scheduler.now());
calledOp.call(scheduler.now());
}
}, 1, 2, TimeUnit.SECONDS);

verify(calledOp, never()).call(anyLong());

InOrder inOrder = Mockito.inOrder(calledOp);

scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, never()).call(anyLong());

scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, times(1)).call(1000L);

scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, never()).call(3000L);

scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, times(1)).call(3000L);

scheduler.advanceTimeBy(5L, TimeUnit.SECONDS);
inOrder.verify(calledOp, times(1)).call(5000L);
inOrder.verify(calledOp, times(1)).call(7000L);

subscription.unsubscribe();
scheduler.advanceTimeBy(11L, TimeUnit.SECONDS);
inOrder.verify(calledOp, never()).call(anyLong());
Expand Down
80 changes: 76 additions & 4 deletions rxjava-core/src/main/java/rx/operators/OperationMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -36,6 +37,10 @@
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

/**
* Flattens a list of Observables into one Observable sequence, without any transformation.
Expand Down Expand Up @@ -93,6 +98,7 @@ public Subscription onSubscribe(Observer<? super Observable<? extends T>> observ

@Override
public void unsubscribe() {
System.out.println("unsubscribe from merge");
unsubscribed = true;
}

Expand Down Expand Up @@ -125,6 +131,7 @@ private MergeObservable(Observable<? extends Observable<? extends T>> sequences)
}

public Subscription onSubscribe(Observer<? super T> actualObserver) {
CompositeSubscription completeSubscription = new CompositeSubscription();

/**
* We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting.
Expand All @@ -134,15 +141,16 @@ public Subscription onSubscribe(Observer<? super T> actualObserver) {
* Bug report: https://github.com/Netflix/RxJava/issues/200
*/
SafeObservableSubscription subscription = new SafeObservableSubscription(ourSubscription);
completeSubscription.add(subscription);
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);

/**
* Subscribe to the parent Observable to get to the children Observables
*/
sequences.subscribe(new ParentObserver(synchronizedObserver));
completeSubscription.add(sequences.subscribe(new ParentObserver(synchronizedObserver)));

/* return our subscription to allow unsubscribing */
return subscription;
return completeSubscription;
}

/**
Expand Down Expand Up @@ -380,6 +388,70 @@ public void testUnSubscribe() {
verify(stringObserver, never()).onCompleted();
}

@Test
public void testUnSubscribeObservableOfObservables() throws InterruptedException {

final AtomicBoolean unsubscribed = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);

Observable<Observable<Long>> source = Observable.create(new OnSubscribeFunc<Observable<Long>>() {

@Override
public Subscription onSubscribe(final Observer<? super Observable<Long>> observer) {
// verbose on purpose so I can track the inside of it
final Subscription s = Subscriptions.create(new Action0() {

@Override
public void call() {
System.out.println("*** unsubscribed");
unsubscribed.set(true);
}

});

new Thread(new Runnable() {

@Override
public void run() {

while (!unsubscribed.get()) {
observer.onNext(Observable.from(1L, 2L));
}
System.out.println("Done looping after unsubscribe: " + unsubscribed.get());
observer.onCompleted();

// mark that the thread is finished
latch.countDown();
}
}).start();

return s;
};

});

final AtomicInteger count = new AtomicInteger();
Observable.create(merge(source)).take(6).toBlockingObservable().forEach(new Action1<Long>() {

@Override
public void call(Long v) {
System.out.println("Value: " + v);
int c = count.incrementAndGet();
if (c > 6) {
fail("Should be only 6");
}

}
});

latch.await(1000, TimeUnit.MILLISECONDS);

System.out.println("unsubscribed: " + unsubscribed.get());

assertTrue(unsubscribed.get());

}

@Test
public void testMergeArrayWithThreading() {
final TestASynchronousObservable o1 = new TestASynchronousObservable();
Expand Down Expand Up @@ -453,9 +525,9 @@ public void onNext(String v) {
// so I'm unfortunately reverting to using a Thread.sleep to allow the process scheduler time
// to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following
// onNext is invoked.

Thread.sleep(300);

try { // in try/finally so threads are released via latch countDown even if assertion fails
assertEquals(1, concurrentCounter.get());
} finally {
Expand Down
22 changes: 22 additions & 0 deletions rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import rx.Subscription;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
Expand Down Expand Up @@ -473,6 +474,27 @@ public Subscription onSubscribe(final Observer<? super String> observer) {
fail("Error: " + observer.error.get().getMessage());
}
}

@Test
public void testRecursion() {
TestScheduler s = new TestScheduler();

final AtomicInteger counter = new AtomicInteger(0);

Subscription subscription = s.schedule(new Action1<Action0>() {

@Override
public void call(Action0 self) {
counter.incrementAndGet();
System.out.println("counter: " + counter.get());
self.call();
}

});
subscription.unsubscribe();
assertEquals(0, counter.get());
}


/**
* Used to determine if onNext is being invoked concurrently.
Expand Down