Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduling actions periodically #246

Merged
merged 11 commits into from
May 1, 2013
177 changes: 171 additions & 6 deletions rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,18 @@
*/
package rx;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;

import rx.concurrency.TestScheduler;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;
Expand Down Expand Up @@ -71,6 +80,56 @@ public abstract class Scheduler {
*/
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit);

/**
* Schedules a cancelable action to be executed periodically.
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
*
* @param state
* State to pass into the action.
* @param action
* The action to execute periodically.
* @param initialDelay
* Time to wait before executing the action for the first time.
* @param period
* The time interval to wait each time in between executing the action.
* @param unit
* The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
final long periodInNanos = unit.toNanos(period);
final AtomicBoolean complete = new AtomicBoolean();

final Func2<Scheduler, T, Subscription> recursiveAction = new Func2<Scheduler, T, Subscription>() {
@Override
public Subscription call(Scheduler scheduler, T state0) {
if (!complete.get()) {
long startedAt = now();
final Subscription sub1 = action.call(scheduler, state0);
long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt);
final Subscription sub2 = schedule(state0, this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS);
return Subscriptions.create(new Action0() {
@Override
public void call() {
sub1.unsubscribe();
sub2.unsubscribe();
}
});
}
return Subscriptions.empty();
}
};
final Subscription sub = schedule(state, recursiveAction, initialDelay, unit);
return Subscriptions.create(new Action0() {
@Override
public void call() {
complete.set(true);
sub.unsubscribe();
}
});
}

/**
* Schedules a cancelable action to be executed at dueTime.
*
Expand Down Expand Up @@ -103,7 +162,7 @@ public Subscription schedule(final Func1<Scheduler, Subscription> action) {
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) {
return action.call(scheduler);
}
});
Expand All @@ -120,7 +179,7 @@ public Subscription schedule(final Func0<Subscription> action) {
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
return action.call();
}
});
Expand All @@ -137,7 +196,7 @@ public Subscription schedule(final Action0 action) {
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
action.call();
return Subscriptions.empty();
}
Expand All @@ -159,7 +218,7 @@ public Subscription schedule(final Func1<Scheduler, Subscription> action, long d
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) {
return action.call(scheduler);
}
}, delayTime, unit);
Expand All @@ -176,7 +235,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
action.call();
return Subscriptions.empty();
}
Expand All @@ -194,17 +253,123 @@ public Subscription schedule(final Func0<Subscription> action, long delayTime, T
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
return action.call();
}
}, delayTime, unit);
}

/**
* Schedules a cancelable action to be executed periodically.
*
* @param action
* The action to execute periodically.
* @param initialDelay
* Time to wait before executing the action for the first time.
* @param period
* The time interval to wait each time in between executing the action.
* @param unit
* The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
public Subscription schedulePeriodically(final Func1<Scheduler, Subscription> action, long initialDelay, long period, TimeUnit unit) {
return schedulePeriodically(null, new Func2<Scheduler, Void, Subscription>() {
@Override
public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) {
return action.call(scheduler);
}
}, initialDelay, period, unit);
}

/**
* Schedules a cancelable action to be executed periodically.
*
* @param action
* The action to execute periodically.
* @param initialDelay
* Time to wait before executing the action for the first time.
* @param period
* The time interval to wait each time in between executing the action.
* @param unit
* The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
public Subscription schedulePeriodically(final Func0<Subscription> action, long initialDelay, long period, TimeUnit unit) {
return schedulePeriodically(null, new Func2<Scheduler, Void, Subscription>() {
@Override
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
return action.call();
}
}, initialDelay, period, unit);
}

/**
* Schedules an action to be executed periodically.
*
* @param action
* The action to execute periodically.
* @param initialDelay
* Time to wait before executing the action for the first time.
* @param period
* The time interval to wait each time in between executing the action.
* @param unit
* The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
return schedulePeriodically(null, new Func2<Scheduler, Void, Subscription>() {
@Override
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
action.call();
return Subscriptions.empty();
}
}, initialDelay, period, unit);
}

/**
* Returns the scheduler's notion of current absolute time in milliseconds.
*/
public long now() {
return System.currentTimeMillis();
}

public static class UnitTest {
@SuppressWarnings("unchecked") // mocking is unchecked, unfortunately
@Test
public void testPeriodicScheduling() {
final Func1<Long, Void> calledOp = mock(Func1.class);

final TestScheduler scheduler = new TestScheduler();
Subscription subscription = scheduler.schedulePeriodically(new Action0() {
@Override public void call() {
System.out.println(scheduler.now());
calledOp.call(scheduler.now());
}
}, 1, 2, TimeUnit.SECONDS);

verify(calledOp, never()).call(anyLong());

InOrder inOrder = Mockito.inOrder(calledOp);

scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, never()).call(anyLong());

scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, times(1)).call(1000L);

scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, never()).call(3000L);

scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, times(1)).call(3000L);

scheduler.advanceTimeBy(5L, TimeUnit.SECONDS);
inOrder.verify(calledOp, times(1)).call(5000L);
inOrder.verify(calledOp, times(1)).call(7000L);

subscription.unsubscribe();
scheduler.advanceTimeBy(11L, TimeUnit.SECONDS);
inOrder.verify(calledOp, never()).call(anyLong());
}
}
}
21 changes: 21 additions & 0 deletions rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,27 @@ public ExecutorScheduler(ScheduledExecutorService executor) {
this.executor = executor;
}

@Override
public <T> Subscription schedulePeriodically(final T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
if (executor instanceof ScheduledExecutorService) {
final CompositeSubscription subscriptions = new CompositeSubscription();

ScheduledFuture<?> f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Subscription s = action.call(ExecutorScheduler.this, state);
subscriptions.add(s);
}
}, initialDelay, period, unit);

subscriptions.add(Subscriptions.create(f));
return subscriptions;

} else {
return super.schedulePeriodically(state, action, initialDelay, period, unit);
}
}

@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
Expand Down
6 changes: 4 additions & 2 deletions rxjava-core/src/main/java/rx/concurrency/TestScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ public int compare(TimedAction<?> action1, TimedAction<?> action2) {
}
}

// Storing time in nanoseconds internally.
private long time;

@Override
public long now() {
return time;
return TimeUnit.NANOSECONDS.toMillis(time);
}

public void advanceTimeBy(long delayTime, TimeUnit unit) {
Expand All @@ -79,6 +80,7 @@ private void triggerActions(long targetTimeInNanos) {
while (!queue.isEmpty()) {
TimedAction<?> current = queue.peek();
if (current.time > targetTimeInNanos) {
time = targetTimeInNanos;
break;
}
time = current.time;
Expand All @@ -95,7 +97,7 @@ public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> acti

@Override
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
queue.add(new TimedAction<T>(this, now() + unit.toNanos(delayTime), action, state));
queue.add(new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state));
return Subscriptions.empty();
}
}
37 changes: 12 additions & 25 deletions rxjava-core/src/main/java/rx/operators/OperationInterval.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -57,47 +56,35 @@ public static Func1<Observer<Long>, Subscription> interval(long interval, TimeUn
}

private static class Interval implements Func1<Observer<Long>, Subscription> {
private final long interval;
private final long period;
private final TimeUnit unit;
private final Scheduler scheduler;

private long currentValue;
private final AtomicBoolean complete = new AtomicBoolean();

private Interval(long interval, TimeUnit unit, Scheduler scheduler) {
this.interval = interval;
private Interval(long period, TimeUnit unit, Scheduler scheduler) {
this.period = period;
this.unit = unit;
this.scheduler = scheduler;
}

@Override
public Subscription call(final Observer<Long> observer) {
scheduler.schedule(new IntervalAction(observer), interval, unit);
return Subscriptions.create(new Action0() {
final Subscription wrapped = scheduler.schedulePeriodically(new Action0() {
@Override
public void call() {
complete.set(true);
observer.onNext(currentValue);
currentValue++;
}
});
}

private class IntervalAction implements Action0 {
private final Observer<Long> observer;

private IntervalAction(Observer<Long> observer) {
this.observer = observer;
}
}, period, period, unit);

@Override
public void call() {
if (complete.get()) {
return Subscriptions.create(new Action0() {
@Override
public void call() {
wrapped.unsubscribe();
observer.onCompleted();
} else {
observer.onNext(currentValue);
currentValue++;
scheduler.schedule(this, interval, unit);
}
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import rx.Observer;
import rx.Subscription;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
import rx.util.AtomicObserver;
Expand Down Expand Up @@ -174,9 +175,8 @@ public Boolean call(Integer input)

@Test
public void testTakeWhileOnSubject1() {
PublishSubject<Integer> s = PublishSubject.create();
Observable<Integer> w = (Observable<Integer>) s;
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>()
Subject<Integer, Integer> s = PublishSubject.create();
Observable<Integer> take = Observable.create(takeWhile(s, new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer input)
Expand Down
Loading