Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler Outer/Inner [Preview] #797

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -19,8 +19,8 @@

import rx.Scheduler;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.util.functions.Func2;
import rx.subscriptions.BooleanSubscription;
import rx.util.functions.Action1;
import android.os.Handler;

/**
Expand All @@ -32,25 +32,29 @@ public class HandlerThreadScheduler extends Scheduler {

/**
* Constructs a {@link HandlerThreadScheduler} using the given {@link Handler}
* @param handler {@link Handler} to use when scheduling actions
*
* @param handler
* {@link Handler} to use when scheduling actions
*/
public HandlerThreadScheduler(Handler handler) {
this.handler = handler;
}

/**
* Calls {@link HandlerThreadScheduler#schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)}
* with a delay of zero milliseconds.
*
* Calls {@link HandlerThreadScheduler#schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)} with a delay of zero milliseconds.
*
* See {@link #schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)}
*/
@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
return schedule(state, action, 0L, TimeUnit.MILLISECONDS);
public Subscription schedule(Action1<Inner> action) {
InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler);
inner.schedule(action);
return inner;
}

/**
* Calls {@link Handler#postDelayed(Runnable, long)} with a runnable that executes the given action.
*
* @param state
* State to pass into the action.
* @param action
Expand All @@ -62,17 +66,60 @@ public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ?
* @return A Subscription from which one can unsubscribe from.
*/
@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
final Scheduler _scheduler = this;
handler.postDelayed(new Runnable() {
@Override
public void run() {
subscription.wrap(action.call(_scheduler, state));
}
}, unit.toMillis(delayTime));
return subscription;
public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler);
inner.schedule(action, delayTime, unit);
return inner;
}
}

private static class InnerHandlerThreadScheduler extends Inner {

private final Handler handler;
private BooleanSubscription innerSubscription = new BooleanSubscription();
private Inner _inner = this;

public InnerHandlerThreadScheduler(Handler handler) {
this.handler = handler;
}

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}

@Override
public void schedule(final Action1<Inner> action, long delayTime, TimeUnit unit) {
handler.postDelayed(new Runnable() {
@Override
public void run() {
if (_inner.isUnsubscribed()) {
return;
}
action.call(_inner);
}
}, unit.toMillis(delayTime));
}

@Override
public void schedule(final Action1<Inner> action) {
handler.postDelayed(new Runnable() {

@Override
public void run() {
if (_inner.isUnsubscribed()) {
return;
}
action.call(_inner);
}

}, 0L);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.robolectric.annotation.Config;

import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscription;
import rx.util.functions.Action1;
import rx.util.functions.Func2;
import android.os.Handler;

Expand All @@ -38,38 +40,36 @@ public class HandlerThreadSchedulerTest {
@Test
public void shouldScheduleImmediateActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
final Object state = new Object();
@SuppressWarnings("unchecked")
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);
final Action1<Inner> action = mock(Action1.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
scheduler.schedule(state, action);
scheduler.schedule(action);

// verify that we post to the given Handler
ArgumentCaptor<Runnable> runnable = ArgumentCaptor.forClass(Runnable.class);
verify(handler).postDelayed(runnable.capture(), eq(0L));

// verify that the given handler delegates to our action
runnable.getValue().run();
verify(action).call(scheduler, state);
verify(action).call(any(Inner.class));
}

@Test
public void shouldScheduleDelayedActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
final Object state = new Object();
@SuppressWarnings("unchecked")
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);
final Action1<Inner> action = mock(Action1.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
scheduler.schedule(state, action, 1L, TimeUnit.SECONDS);
scheduler.schedule(action, 1L, TimeUnit.SECONDS);

// verify that we post to the given Handler
ArgumentCaptor<Runnable> runnable = ArgumentCaptor.forClass(Runnable.class);
verify(handler).postDelayed(runnable.capture(), eq(1000L));

// verify that the given handler delegates to our action
runnable.getValue().run();
verify(action).call(scheduler, state);
verify(action).call(any(Inner.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import rx.Observable;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
import rx.util.async.operators.Functionals;
Expand Down Expand Up @@ -592,9 +593,9 @@ public static <R> Func0<Observable<R>> toAsync(final Func0<? extends R> func, fi
@Override
public Observable<R> call() {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call();
Expand Down Expand Up @@ -649,9 +650,9 @@ public static <T1, R> Func1<T1, Observable<R>> toAsync(final Func1<? super T1, ?
@Override
public Observable<R> call(final T1 t1) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1);
Expand Down Expand Up @@ -708,9 +709,9 @@ public static <T1, T2, R> Func2<T1, T2, Observable<R>> toAsync(final Func2<? sup
@Override
public Observable<R> call(final T1 t1, final T2 t2) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2);
Expand Down Expand Up @@ -769,9 +770,9 @@ public static <T1, T2, T3, R> Func3<T1, T2, T3, Observable<R>> toAsync(final Fun
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3);
Expand Down Expand Up @@ -832,9 +833,9 @@ public static <T1, T2, T3, T4, R> Func4<T1, T2, T3, T4, Observable<R>> toAsync(f
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3, t4);
Expand Down Expand Up @@ -897,9 +898,9 @@ public static <T1, T2, T3, T4, T5, R> Func5<T1, T2, T3, T4, T5, Observable<R>> t
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5);
Expand Down Expand Up @@ -964,9 +965,9 @@ public static <T1, T2, T3, T4, T5, T6, R> Func6<T1, T2, T3, T4, T5, T6, Observab
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6);
Expand Down Expand Up @@ -1033,9 +1034,9 @@ public static <T1, T2, T3, T4, T5, T6, T7, R> Func7<T1, T2, T3, T4, T5, T6, T7,
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6, t7);
Expand Down Expand Up @@ -1104,9 +1105,9 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Func8<T1, T2, T3, T4, T5, T6,
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6, t7, t8);
Expand Down Expand Up @@ -1177,9 +1178,9 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Func9<T1, T2, T3, T4, T5,
@Override
public Observable<R> call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8, final T9 t9) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(t1, t2, t3, t4, t5, t6, t7, t8, t9);
Expand Down Expand Up @@ -1230,9 +1231,9 @@ public static <R> FuncN<Observable<R>> toAsync(final FuncN<? extends R> func, fi
@Override
public Observable<R> call(final Object... args) {
final AsyncSubject<R> subject = AsyncSubject.create();
scheduler.schedule(new Action0() {
scheduler.schedule(new Action1<Inner>() {
@Override
public void call() {
public void call(Inner inner) {
R result;
try {
result = func.call(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package rx.util.async.operators;

import rx.Scheduler.Inner;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

Expand Down Expand Up @@ -65,22 +66,22 @@ public void call() {
* @param run the Runnable to run when the Action0 is called
* @return the Action0 wrapping the Runnable
*/
public static Action0 fromRunnable(Runnable run) {
public static Action1<Inner> fromRunnable(Runnable run) {
if (run == null) {
throw new NullPointerException("run");
}
return new ActionWrappingRunnable(run);
}
/** An Action0 which wraps and calls a Runnable. */
private static final class ActionWrappingRunnable implements Action0 {
/** An Action1 which wraps and calls a Runnable. */
private static final class ActionWrappingRunnable implements Action1<Inner> {
final Runnable run;

public ActionWrappingRunnable(Runnable run) {
this.run = run;
}

@Override
public void call() {
public void call(Inner inner) {
run.run();
}

Expand Down
Loading