Skip to content

Commit

Permalink
Update Modules to New Scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Feb 4, 2014
1 parent d91794b commit 4bdf08d
Show file tree
Hide file tree
Showing 14 changed files with 371 additions and 330 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -19,8 +19,8 @@

import rx.Scheduler;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.util.functions.Func2;
import rx.subscriptions.BooleanSubscription;
import rx.util.functions.Action1;
import android.os.Handler;

/**
Expand All @@ -32,25 +32,29 @@ public class HandlerThreadScheduler extends Scheduler {

/**
* Constructs a {@link HandlerThreadScheduler} using the given {@link Handler}
* @param handler {@link Handler} to use when scheduling actions
*
* @param handler
* {@link Handler} to use when scheduling actions
*/
public HandlerThreadScheduler(Handler handler) {
this.handler = handler;
}

/**
* Calls {@link HandlerThreadScheduler#schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)}
* with a delay of zero milliseconds.
*
* Calls {@link HandlerThreadScheduler#schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)} with a delay of zero milliseconds.
*
* See {@link #schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)}
*/
@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
return schedule(state, action, 0L, TimeUnit.MILLISECONDS);
public Subscription schedule(Action1<Inner> action) {
InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler);
inner.schedule(action);
return inner;
}

/**
* Calls {@link Handler#postDelayed(Runnable, long)} with a runnable that executes the given action.
*
* @param state
* State to pass into the action.
* @param action
Expand All @@ -62,17 +66,60 @@ public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ?
* @return A Subscription from which one can unsubscribe from.
*/
@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
final Scheduler _scheduler = this;
handler.postDelayed(new Runnable() {
@Override
public void run() {
subscription.wrap(action.call(_scheduler, state));
}
}, unit.toMillis(delayTime));
return subscription;
public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler);
inner.schedule(action, delayTime, unit);
return inner;
}
}

private static class InnerHandlerThreadScheduler extends Inner {

private final Handler handler;
private BooleanSubscription innerSubscription = new BooleanSubscription();
private Inner _inner = this;

public InnerHandlerThreadScheduler(Handler handler) {
this.handler = handler;
}

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}

@Override
public void schedule(final Action1<Inner> action, long delayTime, TimeUnit unit) {
handler.postDelayed(new Runnable() {
@Override
public void run() {
if (_inner.isUnsubscribed()) {
return;
}
action.call(_inner);
}
}, unit.toMillis(delayTime));
}

@Override
public void schedule(final Action1<Inner> action) {
handler.postDelayed(new Runnable() {

@Override
public void run() {
if (_inner.isUnsubscribed()) {
return;
}
action.call(_inner);
}

}, 0L);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.robolectric.annotation.Config;

import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscription;
import rx.util.functions.Action1;
import rx.util.functions.Func2;
import android.os.Handler;

Expand All @@ -38,38 +40,36 @@ public class HandlerThreadSchedulerTest {
@Test
public void shouldScheduleImmediateActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
final Object state = new Object();
@SuppressWarnings("unchecked")
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);
final Action1<Inner> action = mock(Action1.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
scheduler.schedule(state, action);
scheduler.schedule(action);

// verify that we post to the given Handler
ArgumentCaptor<Runnable> runnable = ArgumentCaptor.forClass(Runnable.class);
verify(handler).postDelayed(runnable.capture(), eq(0L));

// verify that the given handler delegates to our action
runnable.getValue().run();
verify(action).call(scheduler, state);
verify(action).call(any(Inner.class));
}

@Test
public void shouldScheduleDelayedActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
final Object state = new Object();
@SuppressWarnings("unchecked")
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);
final Action1<Inner> action = mock(Action1.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
scheduler.schedule(state, action, 1L, TimeUnit.SECONDS);
scheduler.schedule(action, 1L, TimeUnit.SECONDS);

// verify that we post to the given Handler
ArgumentCaptor<Runnable> runnable = ArgumentCaptor.forClass(Runnable.class);
verify(handler).postDelayed(runnable.capture(), eq(1000L));

// verify that the given handler delegates to our action
runnable.getValue().run();
verify(action).call(scheduler, state);
verify(action).call(any(Inner.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import rx.Observable;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
import rx.util.async.operators.Functionals;
Expand Down Expand Up @@ -592,9 +593,9 @@ public static <R> Func0<Observable<R>> toAsync(final Func0<? extends R> func, fi
@Override
public Observable<R> call() {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call();
Expand Down Expand Up @@ -649,9 +650,9 @@ public static <T1, R> Func1<T1, Observable<R>> toAsync(final Func1<? super T1, ?
@Override
public Observable<R> call(final T1 t1) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1);
Expand Down Expand Up @@ -708,9 +709,9 @@ public static <T1, T2, R> Func2<T1, T2, Observable<R>> toAsync(final Func2<? sup
@Override
public Observable<R> call(final T1 t1, final T2 t2) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2);
Expand Down Expand Up @@ -769,9 +770,9 @@ public static <T1, T2, T3, R> Func3<T1, T2, T3, Observable<R>> toAsync(final Fun
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3);
Expand Down Expand Up @@ -832,9 +833,9 @@ public static <T1, T2, T3, T4, R> Func4<T1, T2, T3, T4, Observable<R>> toAsync(f
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3, t4);
Expand Down Expand Up @@ -897,9 +898,9 @@ public static <T1, T2, T3, T4, T5, R> Func5<T1, T2, T3, T4, T5, Observable<R>> t
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5);
Expand Down Expand Up @@ -964,9 +965,9 @@ public static <T1, T2, T3, T4, T5, T6, R> Func6<T1, T2, T3, T4, T5, T6, Observab
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6);
Expand Down Expand Up @@ -1033,9 +1034,9 @@ public static <T1, T2, T3, T4, T5, T6, T7, R> Func7<T1, T2, T3, T4, T5, T6, T7,
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6, t7);
Expand Down Expand Up @@ -1104,9 +1105,9 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Func8<T1, T2, T3, T4, T5, T6,
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6, t7, t8);
Expand Down Expand Up @@ -1177,9 +1178,9 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Func9<T1, T2, T3, T4, T5,
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8, final T9 t9) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6, t7, t8, t9);
Expand Down Expand Up @@ -1230,9 +1231,9 @@ public static <R> FuncN<Observable<R>> toAsync(final FuncN<? extends R> func, fi
@Override
public Observable<R> call(final Object... args) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package rx.util.async.operators;

import rx.Scheduler.Inner;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

Expand Down Expand Up @@ -65,22 +66,22 @@ public void call() {
* @param run the Runnable to run when the Action0 is called
* @return the Action0 wrapping the Runnable
*/
public static Action0 fromRunnable(Runnable run) {
public static Action1<Inner> fromRunnable(Runnable run) {
if (run == null) {
throw new NullPointerException("run");
}
return new ActionWrappingRunnable(run);
}
/** An Action0 which wraps and calls a Runnable. */
private static final class ActionWrappingRunnable implements Action0 {
/** An Action1 which wraps and calls a Runnable. */
private static final class ActionWrappingRunnable implements Action1<Inner> {
final Runnable run;

public ActionWrappingRunnable(Runnable run) {
this.run = run;
}

@Override
public void call() {
public void call(Inner inner) {
run.run();
}

Expand Down
Loading

0 comments on commit 4bdf08d

Please sign in to comment.